查看原文
其他

Istio源码解析系列part3—Mixer工作流程浅析

郑伟 几米宋 2022-09-07

作者:郑伟,小米信息部技术架构组

本文是Istio原文解析系列第三篇,查看本系列:

本系列文章主要从源码(35e2b904)出发,对istio做深入剖析,让大家对istio有更深的认知,从而方便平时排查问题。不了解Service Mesh和Istio的同学请先阅读敖小剑老师如下文章进行概念上的理解:

  • Service Mesh:下一代微服务

  • 服务网格新生代-Istio

本文主要对istio在ubuntu16.04下环境搭建做简单介绍,Mac用户和其他linux发行版用户请根据bash脚本做相应调整。

概念介绍

Mixer提供三个核心功能:

  • 前置条件检查(Precondition Checking):某一服务响应外部请求前,通过Envoy向Mixer发送Check请求,检查该请求是否满足一定的前提条件,包括白名单检查、ACL检查等。

  • 配额管理(Quota Management):当多个请求发生资源竞争时,通过配额管理机制可以实现对资源的有效管理。

  • 遥测报告上报(Telemetry Reporting):该服务处理完请求后,通过Envoy向Mixer上报日志、监控等数据。

要深入了解Mixer,我们先对如下几个概念做介绍:

Attribute(属性)

大部分attributes由Envoy提供。Istio用attributes来控制服务在Service Mesh中运行时行为。attributes是有名称和类型的元数据,用来描述入口和出口流量和流量产生时的环境。attributes携带了一些具体信息,比如:API请求状态码、请求响应时间、TCP连接的原始地址等。

RefrencedAttributes(被引用的属性)

refrencedAttributes是Mixer Check时进行条件匹配后被使用的属性的集合。Envoy向Mixer发送的Check请求中传递的是属性的全集,refrencedAttributes只是该全集中被应用的一个子集。

举个例子,Envoy某次发送的Check请求中发送的attributes为 {request.path:xyz/abc,request.size:234,source.ip:192.168.0.1},如Mixer中调度到的多个adapters只用到了 request.pathrequest.size这两个属性。那么Check后返回的refrencedAttributes为 {request.path:xyz/abc,request.size:234}

为防止每次请求时Envoy都向Mixer中发送Check请求,Mixer中建立了一套复杂的缓存机制,使得大部分请求不需要向Mixer发送Check请求。

  1. request.path: xyz/abc

  2. request.size: 234

  3. request.time: 12:34:56.789 04/17/2017

  4. source.ip: 192.168.0.1

  5. destination.service: example

属性词汇由 [_.a-z0-9]组成,其中 .为命名空间分隔符,所有属性词汇可以查看这里,属性类型可以查看这里。

Adapter(适配器)

Mixer是一个高度模块化、可扩展组件,内部提供了多个适配器(adapter)。

Envoy提供request级别的属性(attributes)数据。

adapters基于这些attributes来实现日志记录、监控指标采集展示、配额管理、ACL检查等功能。Istio内置的部分adapters举例如下:

  • circonus:一个微服务监控分析平台。

  • cloudwatch:一个针对AWS云资源监控的工具。

  • fluentd:一款开源的日志采集工具。

  • prometheus:一款开源的时序数据库,非常适合用来存储监控指标数据。

  • statsd:一款采集汇总应用指标的工具。

  • stdio:stdio适配器使Istio能将日志和metrics输出到本地,结合内置的ES、Grafana就可以查看相应的日志或指标了。



Template(模板)

对于一个网络请求,Mixer通常会调用两个rpc:Check和Report。不同的adapter需要不同的attributes,template定义了attributes到adapter输入数据映射的schema,一个适配器可以支持多个template。一个上报metric数据的模板如下所示:

  1. apiVersion: "config.istio.io/v1alpha2"

  2. kind: metric

  3. metadata:

  4.  name: requestsize

  5.  namespace: istio-system

  6. spec:

  7.  value: request.size | 0

  8.  dimensions:

  9.    source_service: source.service | "unknown"

  10.    source_version: source.labels["version"] | "unknown"

  11.    destination_service: destination.service | "unknown"

  12.    destination_version: destination.labels["version"] | "unknown"

  13.    response_code: response.code | 200

  14.  monitored_resource_type: '"UNSPECIFIED"'

模板字段的值可以是字面量或者表达式,如果时表达式,则表达式的值类型必须与字段的数据类型一致。

Mixer的配置模型

Mixer的yaml配置可以抽象成三种模型:HandlerInstanceRule这三种模型主要通过yaml中的kind字段做区分,kind值有如下几种:

  • adapter kind:表示此配置为Handler。

  • template kind:表示此配置为Template。

  • "rule":表示此配置为Rule。

Handler

一个Handler是配置好的Adpater的实例。Handler从yaml配置文件中取出adapter需要的配置数据。一个典型的Promethues Handler配置如下所示:

  1. apiVersion: config.istio.io/v1alpha2

  2. kind: prometheus

  3. metadata:

  4.  name: handler

  5.  namespace: istio-system

  6. spec:

  7.  metrics:

  8.  - name: request_count

  9.    instance_name: requestcount.metric.istio-system

  10.    kind: COUNTER

  11.    label_names:

  12.    - destination_service

  13.    - destination_version

  14.    - response_code

对于Handler而言, {metadata.name}.{kind}.{metadata.namespace}是其完全限定名(Fully Qualified name),上述Handler的完全限定名是handler.prometheus.istio-system,完全限定名是全局唯一的。

adapter的配置信息定义在spec段中,每个adapter配置的格式都有所区别,可以从这里查看指定的adapter配置格式。上述Handler中引用了 requestduration.metric.istio-system这个Instance。

Instance

Instance定义了attributes到adapter输入的映射,一个处理requestduration metric数据的Instance配置如下所示:

  1. apiVersion: config.istio.io/v1alpha2

  2. kind: metric

  3. metadata:

  4.  name: requestduration

  5.  namespace: istio-system

  6. spec:

  7.  value: response.duration | "0ms"

  8.  dimensions:

  9.    destination_service: destination.service | "unknown"

  10.    destination_version: destination.labels["version"] | "unknown"

  11.    response_code: response.code | 200

  12.  monitored_resource_type: '"UNSPECIFIED"'

上述Instance的完全限定名是requestduration.metric.istio-system,Handler和Rule可以通过这个名称对此Instance进行引用。

Rule

Rule定义了一个特定的Instance何时调用一个特定的Handler,一个典型的Rule配置如下所示:

  1. apiVersion: config.istio.io/v1alpha2

  2. kind: rule

  3. metadata:

  4.  name: promhttp

  5.  namespace: istio-system

  6. spec:

  7.  match: destination.service == "service1.ns.svc.cluster.local" && request.headers["x-user"] == "user1"

  8.  actions:

  9.  - handler: handler.prometheus

  10.    instances:

  11.    - requestduration.metric.istio-system

上述例子中,定义的Rule为:对目标服务为 service1.ns.svc.cluster.localrequest.headers["x-user"]user1的请求,Instance: requestduration.metric.istio-system才调用Handler: handler.prometheus

Mixer工作流程源码分析

上面简单介绍了Mixer相关概念,下面我们从源码出发来对Mixer工作流程做分析。

编译mixer二进制文件和docker镜像

先看Makfile:

  1. ···

  2. MIXER_GO_BINS:=${ISTIO_OUT}/mixs ${ISTIO_OUT}/mixc

  3. mixc: # Mixer客户端,通过mixc我们可以和运行的mixer进行交互。

  4.    bin/gobuild.sh ${ISTIO_OUT}/mixc istio.io/istio/pkg/version ./mixer/cmd/mixc

  5. mixs: # Mixer服务端,和Envoy、adapter交互。部署Istio的时候随之启动。

  6.    bin/gobuild.sh ${ISTIO_OUT}/mixs istio.io/istio/pkg/version ./mixer/cmd/mixs

  7. ···

  8. include tools/istio-docker.mk # 引入编译docker镜像的Makefile文件。

  9. ...

Makefile中定义了 mixs(mixer server)mixc(mixer client)的编译流程。使用指令 make mixs mixc编译好二进制文件后,再编译docker镜像。 istio-docker.mk中编译mixer镜像相关指令如下:

  1. ...

  2. MIXER_DOCKER:=docker.mixer docker.mixer_debug

  3. $(MIXER_DOCKER): mixer/docker/Dockerfile$$(suffix $$@) \

  4.        $(ISTIO_DOCKER)/ca-certificates.tgz $(ISTIO_DOCKER)/mixs | $(ISTIO_DOCKER)

  5.    $(DOCKER_RULE)

  6. ...

执行 make docker.mixer会在本地编译mixer镜像,依据的dockerfile是 mixer/docker/Dockerfile.mixer,如下所示:

  1. FROM scratch

  2. # obtained from debian ca-certs deb using fetch_cacerts.sh

  3. ADD ca-certificates.tgz /

  4. ADD mixs /usr/local/bin/

  5. ENTRYPOINT ["/usr/local/bin/mixs", "server"]

  6. CMD ["--configStoreURL=fs:///etc/opt/mixer/configroot","--configStoreURL=k8s://"]

可以知道容器启动时执行的mixs指令为 /usr/local/bin/mixs server--configStoreURL=fs:///etc/opt/mixer/configroot --configStoreURL=k8s://

Mixer Server启动流程

mixs启动入口:

  1. // supportedTemplates 从mixer/pkg/template包获取所有注册的模板信息。

  2. func supportedTemplates() map[string]template.Info {

  3.    return generatedTmplRepo.SupportedTmplInfo

  4. }

  5. // supportedAdapters 从mixer/pkg/adapter包获取所有注册的适配器信息。

  6. func supportedAdapters() []adptr.InfoFn {

  7.    return adapter.Inventory()

  8. }

  9. func main() {

  10.    // 构造cobra.Command实例,mixs server子命令设计在serverCmd中定义。

  11.    rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf)

  12.    if err := rootCmd.Execute(); err != nil {

  13.        os.Exit(-1)

  14.    }

  15. }

mixs server子命令在 istio/mixer/cmd/mixs/cmd/server.go#serverCmd中定义:

  1. func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command {

  2.    ...

  3.    serverCmd := &cobra.Command{

  4.        Use:   "server",

  5.        Short: "Starts Mixer as a server",

  6.        Run: func(cmd *cobra.Command, args []string) {

  7.            // 用户执行mixs server命令时,启动mixer gRPC server

  8.            runServer(sa, printf, fatalf)

  9.        },

  10.    }

  11.    ...

  12. }

  13. // runServer函数启动mixer gRPC server

  14. func runServer(sa *server.Args, printf, fatalf shared.FormatFn) {

  15.    s, err := server.New(sa)

  16.    ...

  17.    s.Run()

  18.    ...

  19. }

gRPC server启动主要逻辑在 istio/mixer/pkg/server/server.go#newServer

  1. func newServer(a *Args, p *patchTable) (*Server, error) {

  2.    ...

  3.    s := &Server{}

  4.    // 初始化API worker线程池

  5.    s.gp = pool.NewGoroutinePool(apiPoolSize, a.SingleThreaded)

  6.    s.gp.AddWorkers(apiPoolSize)

  7.    // 初始化adapter worker线程池

  8.    s.adapterGP = pool.NewGoroutinePool(adapterPoolSize, a.SingleThreaded)

  9.    s.adapterGP.AddWorkers(adapterPoolSize)

  10.    // 构造存放Mixer模板仓库

  11.    tmplRepo := template.NewRepository(a.Templates)

  12.    // 构造存放adapter的map

  13.    adapterMap := config.AdapterInfoMap(a.Adapters, tmplRepo.SupportsTemplate)

  14.    ...

  15.    // 构造Mixer runtime实例。runtime实例是Mixer运行时环境的主要入口。

  16.    // 它会监听配置变更,配置变更时会动态构造新的handler实例和dispatcher实例。

  17.    // dispatcher会基于配置和attributes对请求进行调度,调用相应的adapters处理请求。

  18.    rt = p.newRuntime(st, templateMap, adapterMap, a.ConfigIdentityAttribute, a.ConfigDefaultNamespace,

  19.        s.gp, s.adapterGP, a.TracingOptions.TracingEnabled())

  20.    // runtime实例开始监听配置变更,一旦配置变更,runtime实例会构造新的dispatcher。

  21.    p.runtimeListen(rt)

  22.    s.dispatcher = rt.Dispatcher()

  23.    ...

  24.    // 注册Mixer gRPC server

  25.    mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp))

  26.    // 启动ControlZ监听器,ControlZ提供了Istio的内省功能。Mixer与ctrlz集成时,会启动一个

  27.    // web service监听器用于展示Mixer的环境变量、参数版本信息、内存信息、进程信息、metrics等。

  28.    go ctrlz.Run(a.IntrospectionOptions, nil)

  29.    return s, nil

  30. }

其中 istio/mixer/pkg/api/grpcServer.go#NewGRPCServer函数中初始化了保存attributes的list和全局字典

  1. func NewGRPCServer(dispatcher dispatcher.Dispatcher, gp *pool.GoroutinePool) mixerpb.MixerServer {

  2.    // 从globalList拷贝出list切片,list形如[]string{"source.ip","source.port","request.id"...}

  3.    list := attribute.GlobalList()

  4.    // 将以attribute.name作为key,index作为value,构造map。形如:map[string][int]{"source.ip":1, "source.port":2, "request.id":3...}

  5.    globalDict := make(map[string]int32, len(list))

  6.    for i := 0; i < len(list); i++ {

  7.        globalDict[list[i]] = int32(i)

  8.    }

  9.    return &grpcServer{

  10.        dispatcher:     dispatcher,

  11.        gp:             gp,

  12.        globalWordList: list,

  13.        globalDict:     globalDict,

  14.    }

  15. }

Mixer启动的gRPC server定义了两个rpc:Check、Report。

istio/vendor/istio.io/api/mixer/v1/service.proto#48行

  1. service Mixer {

  2.  // Check 基于活动配置和Envoy提供的attributes,执行前置条件检查和配额管理。

  3.  rpc Check(CheckRequest) returns (CheckResponse) {}

  4.  // Reports 基于活动配置和Envoy提供的attribues上报遥测数据(如logs和metrics)。

  5.  rpc Report(ReportRequest) returns (ReportResponse) {}

  6. }

CheckRequest、CheckResponse结构如下所示:

  1. message CheckRequest {

  2.  // QuotaParams 定义了配额管理相关的参数。

  3.  message QuotaParams {

  4.    int64 amount = 1;     // amount 为可分配的配额总数

  5.    bool best_effort = 2; // best_effort 为真时,表示返回的配额数小于请求的配额数

  6.  }

  7.  // CompressedAttributes 为压缩过的本次请求的attributes

  8.  CompressedAttributes attributes = 1 [(gogoproto.nullable) = false];

  9.  // global_word_count 为attribute字典单词总数,用于判断客户端和Mixer gRPC server所用的字典是否同步

  10.  uint32 global_word_count = 2;

  11.  // deduplication_id 用于某次rpc请求失败后重试

  12.  string deduplication_id = 3;

  13.  //  quotas 进行分配的配额表,key为用户自定义的配额名如“requestCount”

  14.  map<string, QuotaParams> quotas = 4 [(gogoproto.nullable) = false];

  15. }

  16. message CheckResponse {

  17.  // PreconditionResult 前置条件检查结果

  18.  message PreconditionResult {

  19.    // status 请求结果状态码,0表示成功

  20.    google.rpc.Status status = 1 [(gogoproto.nullable) = false];

  21.    // valid_duration 用于判断本次结果是否合法的时间总数

  22.    google.protobuf.Duration valid_duration = 2 [(gogoproto.nullable) = false, (gogoproto.stdduration) = true];

  23.    // valid_use_count 用于判断本次结果是否合法的使用次数总数

  24.    int32 valid_use_count = 3;

  25.    // CompressedAttributes 返回的attributes数据,是请求的attributes和Mixer配置产生的attributes的集合

  26.    CompressedAttributes attributes = 4 [(gogoproto.nullable) = false];

  27.    // ReferencedAttributes Mixer adapters引用过的attritbues

  28.    ReferencedAttributes referenced_attributes = 5 [(gogoproto.nullable) = false];

  29.  }

  30.  // QuotaResult 配额检查结果

  31.  message QuotaResult {

  32.    google.protobuf.Duration valid_duration = 1 [(gogoproto.nullable) = false, (gogoproto.stdduration) = true];

  33.    // 授予的配额总数

  34.    int64 granted_amount = 2;

  35.    ReferencedAttributes referenced_attributes = 5 [(gogoproto.nullable) = false];

  36.  }

  37.  PreconditionResult precondition = 2 [(gogoproto.nullable) = false];

  38.  map<string, QuotaResult> quotas = 3 [(gogoproto.nullable) = false];

  39. }

ReportRequest、ReportResponse结构如下所示:

  1. message ReportRequest {

  2.  // CompressedAttributes 本次请求的attributes数据

  3.  repeated CompressedAttributes attributes = 1 [(gogoproto.nullable) = false];

  4.  // default_words 默认的message级别的attributes字典

  5.  repeated string default_words = 2;

  6.  // global_word_count 全局attribute字典总数

  7.  uint32 global_word_count = 3;

  8. }

  9. message ReportResponse {

  10. }

Check请求执行细节

  1. func (s *grpcServer) Check(legacyCtx legacyContext.Context, req *mixerpb.CheckRequest) (*mixerpb.CheckResponse, error) {

  2.    // 构造基于proto的属性包protoBag。protoBag提供了对一组attributes进行访问、修改的机制。

  3.    protoBag := attribute.NewProtoBag(&req.Attributes, s.globalDict, s.globalWordList)

  4.    defer protoBag.Done()

  5.    // 构造可变的(执行check方法后会变化)属性包checkBag

  6.    checkBag := attribute.GetMutableBag(protoBag)

  7.    defer checkBag.Done()

  8.    // 执行dispatcher的预处理过程,s.dispatcher为runtime实例impl。

  9.    // impl的Preprocess方法会调度生成属性相关的adapter,比如kubernetes adapter。

  10.    s.dispatcher.Preprocess(legacyCtx, protoBag, checkBag);

  11.    // 获取属性包中被引用的属性快照snapApa,snapApa能在每次check和quota处理中重复使用。

  12.    snapApa := protoBag.SnapshotReferencedAttributes()

  13.    // 执行dispatcher的前置条件检查,Check方法内部会计算被引用的属性并同步到protoBag中。

  14.    cr, err := s.dispatcher.Check(legacyCtx, checkBag)

  15.    ...

  16.    // 构造Check rpc response实例

  17.    resp := &mixerpb.CheckResponse{

  18.        Precondition: mixerpb.CheckResponse_PreconditionResult{

  19.            ValidDuration:        cr.ValidDuration,

  20.            ValidUseCount:        cr.ValidUseCount,

  21.            Status:               cr.Status,

  22.            ReferencedAttributes: protoBag.GetReferencedAttributes(s.globalDict, globalWordCount),

  23.        },

  24.    }

  25.    // 如果前置条件检查通过且配额表总数大于0,则计算新的配额

  26.    if status.IsOK(resp.Precondition.Status) && len(req.Quotas) > 0 {

  27.        resp.Quotas = make(map[string]mixerpb.CheckResponse_QuotaResult, len(req.Quotas))

  28.        // 遍历配额表,计算每个配额是否为引用配额

  29.        for name, param := range req.Quotas {

  30.            qma := &dispatcher.QuotaMethodArgs{

  31.                Quota:           name,

  32.                Amount:          param.Amount,

  33.                DeduplicationID: req.DeduplicationId + name,

  34.                BestEffort:      param.BestEffort,

  35.            }

  36.            protoBag.RestoreReferencedAttributes(snapApa)

  37.            crqr := mixerpb.CheckResponse_QuotaResult{}

  38.            var qr *adapter.QuotaResult

  39.            // 执行dispacher的配额处理方法。istio/mixer/pkg/runtime/dispatcher/dispatcher.go#func (d *Impl) Quota()

  40.            qr, err = s.dispatcher.Quota(legacyCtx, checkBag, qma)

  41.            if err != nil {

  42.                err = fmt.Errorf("performing quota alloc failed: %v", err)

  43.                log.Errora("Quota failure:", err.Error())

  44.            } else if qr == nil {

  45.                crqr.ValidDuration = defaultValidDuration

  46.                crqr.GrantedAmount = qma.Amount

  47.            } else {

  48.                if !status.IsOK(qr.Status) {

  49.                    log.Debugf("Quota denied: %v", qr.Status)

  50.                }

  51.                crqr.ValidDuration = qr.ValidDuration

  52.                crqr.GrantedAmount = qr.Amount

  53.            }

  54.            // 根据全局attribute字典来计算被引用的attributes

  55.            crqr.ReferencedAttributes = protoBag.GetReferencedAttributes(s.globalDict, globalWordCount)

  56.            resp.Quotas[name] = crqr

  57.        }

  58.    }

  59.    // 返回Check gRPC相应结果

  60.    return resp, nil

  61. }

Report请求执行整体逻辑和Check相似,本文暂不做解析。

Mixer适配器工作流程

  • Mixer server启动。

  • 初始化adapter worker线程池。

  • 初始化Mixer模板仓库。

  • 初始化adapter builder表。

  • 初始化runtime实例。

  • 注册并启动gRPC server。

  • 某一服务外部请求被envoy拦截,envoy根据请求生成指定的attributes,attributes作为参数之一向Mixer发起Check rpc请求。

  • Mixer 进行前置条件检查和配额检查,调用相应的adapter做处理,并返回相应结果。

  • Envoy分析结果,决定是否执行请求或拒绝请求。若可以执行请求则执行请求。请求完成后再向Mixer gRPC服务发起Report rpc请求,上报遥测数据。

  • Mixer后端的adapter基于遥测数据做进一步处理。


点击【阅读原文】跳转到ServiceMesher社区官网,阅读体验更佳。


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存